<?php
namespace rakafka;

use \Kafka\ConsumerConfig;
use \Kafka\Consumer;

class KafkaConsumer
{
    #kafkaz主要配置
    private $config;

    public function __construct()
    {
        $this->config = ConsumerConfig::getInstance();
        $this->config->setMetadataRefreshIntervalMs(10000);
        $this->config->setMetadataBrokerList('172.31.32.150:9092');
        $this->config->setGroupId('default-group');
        $this->config->setBrokerVersion('0.9.0.1');
        $this->config->setTopics(array('deposit'));
    }

    public function consumer()
    {
        $consumer = new Consumer();
        $consumer->start(function($topic, $part, $message) {
            return [$topic, $part, $message['message']['value']];
        });
    }

}



